package service

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"math"
	"math/rand"
	"sort"
	"time"

	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/flexprice/flexprice/internal/api/dto"
	"github.com/flexprice/flexprice/internal/config"
	"github.com/flexprice/flexprice/internal/domain/events"
	"github.com/flexprice/flexprice/internal/domain/feature"
	"github.com/flexprice/flexprice/internal/domain/meter"
	"github.com/flexprice/flexprice/internal/domain/price"
	"github.com/flexprice/flexprice/internal/domain/subscription"
	ierr "github.com/flexprice/flexprice/internal/errors"
	"github.com/flexprice/flexprice/internal/pubsub"
	"github.com/flexprice/flexprice/internal/pubsub/kafka"
	pubsubRouter "github.com/flexprice/flexprice/internal/pubsub/router"
	"github.com/flexprice/flexprice/internal/types"
	"github.com/samber/lo"
	"github.com/shopspring/decimal"
)

// FeatureUsageTrackingService handles feature usage tracking operations for metered events
type FeatureUsageTrackingService interface {
	// Publish an event for feature usage tracking
	PublishEvent(ctx context.Context, event *events.Event, isBackfill bool) error

	// Register message handler with the router
	RegisterHandler(router *pubsubRouter.Router, cfg *config.Configuration)

	// Get detailed usage analytics with filtering, grouping, and time-series data
	GetDetailedUsageAnalytics(ctx context.Context, req *dto.GetUsageAnalyticsRequest) (*dto.GetUsageAnalyticsResponse, error)

	// Reprocess events for a specific customer or with other filters
	ReprocessEvents(ctx context.Context, params *events.ReprocessEventsParams) error
}

type featureUsageTrackingService struct {
	ServiceParams
	pubSub           pubsub.PubSub // Regular PubSub for normal processing
	backfillPubSub   pubsub.PubSub // Dedicated Kafka PubSub for backfill processing
	eventRepo        events.Repository
	featureUsageRepo events.FeatureUsageRepository
}

// NewFeatureUsageTrackingService creates a new feature usage tracking service
func NewFeatureUsageTrackingService(
	params ServiceParams,
	eventRepo events.Repository,
	featureUsageRepo events.FeatureUsageRepository,
) FeatureUsageTrackingService {
	ev := &featureUsageTrackingService{
		ServiceParams:    params,
		eventRepo:        eventRepo,
		featureUsageRepo: featureUsageRepo,
	}

	pubSub, err := kafka.NewPubSubFromConfig(
		params.Config,
		params.Logger,
		params.Config.FeatureUsageTracking.ConsumerGroup,
	)

	if err != nil {
		params.Logger.Fatalw("failed to create pubsub", "error", err)
		return nil
	}
	ev.pubSub = pubSub

	backfillPubSub, err := kafka.NewPubSubFromConfig(
		params.Config,
		params.Logger,
		params.Config.FeatureUsageTracking.ConsumerGroupBackfill,
	)
	if err != nil {
		params.Logger.Fatalw("failed to create backfill pubsub", "error", err)
		return nil
	}
	ev.backfillPubSub = backfillPubSub
	return ev
}

// PublishEvent publishes an event to the feature usage tracking topic
func (s *featureUsageTrackingService) PublishEvent(ctx context.Context, event *events.Event, isBackfill bool) error {
	// Create message payload
	payload, err := json.Marshal(event)
	if err != nil {
		return ierr.WithError(err).
			WithHint("Failed to marshal event for feature usage tracking").
			Mark(ierr.ErrValidation)
	}

	// Create a deterministic partition key based on tenant_id and external_customer_id
	// This ensures all events for the same customer go to the same partition
	partitionKey := event.TenantID
	if event.ExternalCustomerID != "" {
		partitionKey = fmt.Sprintf("%s:%s", event.TenantID, event.ExternalCustomerID)
	}

	// Make UUID truly unique by adding nanosecond precision timestamp and random bytes
	uniqueID := fmt.Sprintf("%s-%d-%d", event.ID, time.Now().UnixNano(), rand.Int63())

	// Use the partition key as the message ID to ensure consistent partitioning
	msg := message.NewMessage(uniqueID, payload)

	// Set metadata for additional context
	msg.Metadata.Set("tenant_id", event.TenantID)
	msg.Metadata.Set("environment_id", event.EnvironmentID)
	msg.Metadata.Set("partition_key", partitionKey)

	if isBackfill {
		pubSub := s.backfillPubSub
		topic := s.Config.FeatureUsageTracking.TopicBackfill

		if pubSub == nil {
			return ierr.NewError("pubsub not initialized").
				WithHint("Please check the config").
				Mark(ierr.ErrSystem)
		}

		s.Logger.Debugw("publishing event for feature usage tracking",
			"event_id", event.ID,
			"event_name", event.EventName,
			"partition_key", partitionKey,
			"topic", topic,
		)

		// Publish to feature usage tracking topic using the backfill PubSub (Kafka)
		if err := pubSub.Publish(ctx, topic, msg); err != nil {
			return ierr.WithError(err).
				WithHint("Failed to publish event for feature usage tracking").
				Mark(ierr.ErrSystem)
		}
	}
	return nil
}

// RegisterHandler registers a handler for the feature usage tracking topic with rate limiting
func (s *featureUsageTrackingService) RegisterHandler(router *pubsubRouter.Router, cfg *config.Configuration) {
	// Add throttle middleware to this specific handler
	throttle := middleware.NewThrottle(cfg.FeatureUsageTracking.RateLimit, time.Second)

	// Add the handler
	router.AddNoPublishHandler(
		"feature_usage_tracking_handler",
		cfg.FeatureUsageTracking.Topic,
		s.pubSub,
		s.processMessage,
		throttle.Middleware,
	)

	s.Logger.Infow("registered event feature usage tracking handler",
		"topic", cfg.FeatureUsageTracking.Topic,
		"rate_limit", cfg.FeatureUsageTracking.RateLimit,
	)

	// Add backfill handler
	if cfg.FeatureUsageTracking.TopicBackfill == "" {
		s.Logger.Warnw("backfill topic not set, skipping backfill handler")
		return
	}

	backfillThrottle := middleware.NewThrottle(cfg.FeatureUsageTracking.RateLimitBackfill, time.Second)
	router.AddNoPublishHandler(
		"feature_usage_tracking_backfill_handler",
		cfg.FeatureUsageTracking.TopicBackfill,
		s.backfillPubSub, // Use the dedicated Kafka backfill PubSub
		s.processMessage,
		backfillThrottle.Middleware,
	)

	s.Logger.Infow("registered event feature usage tracking backfill handler",
		"topic", cfg.FeatureUsageTracking.TopicBackfill,
		"rate_limit", cfg.FeatureUsageTracking.RateLimitBackfill,
		"pubsub_type", "kafka",
	)
}

// Process a single event message for feature usage tracking
func (s *featureUsageTrackingService) processMessage(msg *message.Message) error {
	// Extract tenant ID from message metadata
	partitionKey := msg.Metadata.Get("partition_key")
	tenantID := msg.Metadata.Get("tenant_id")
	environmentID := msg.Metadata.Get("environment_id")

	s.Logger.Debugw("processing event from message queue",
		"message_uuid", msg.UUID,
		"partition_key", partitionKey,
		"tenant_id", tenantID,
		"environment_id", environmentID,
	)

	// Create a background context with tenant ID
	ctx := context.Background()
	if tenantID != "" {
		ctx = context.WithValue(ctx, types.CtxTenantID, tenantID)
	}

	if environmentID != "" {
		ctx = context.WithValue(ctx, types.CtxEnvironmentID, environmentID)
	}

	// Unmarshal the event
	var event events.Event
	if err := json.Unmarshal(msg.Payload, &event); err != nil {
		s.Logger.Errorw("failed to unmarshal event for feature usage tracking",
			"error", err,
			"message_uuid", msg.UUID,
		)
		return nil // Don't retry on unmarshal errors
	}

	// validate tenant id
	if event.TenantID != tenantID {
		s.Logger.Errorw("invalid tenant id",
			"expected", tenantID,
			"actual", event.TenantID,
			"message_uuid", msg.UUID,
		)
		return nil // Don't retry on invalid tenant id
	}

	// Process the event
	if err := s.processEvent(ctx, &event); err != nil {
		s.Logger.Errorw("failed to process event for feature usage tracking",
			"error", err,
			"event_id", event.ID,
			"event_name", event.EventName,
		)
		return err // Return error for retry
	}

	s.Logger.Infow("event for feature usage tracking processed successfully",
		"event_id", event.ID,
		"event_name", event.EventName,
	)

	return nil
}

// Process a single event for feature usage tracking
func (s *featureUsageTrackingService) processEvent(ctx context.Context, event *events.Event) error {
	s.Logger.Debugw("processing event",
		"event_id", event.ID,
		"event_name", event.EventName,
		"external_customer_id", event.ExternalCustomerID,
		"ingested_at", event.IngestedAt,
	)

	featureUsage, err := s.prepareProcessedEvents(ctx, event)
	if err != nil {
		s.Logger.Errorw("failed to prepare feature usage",
			"error", err,
			"event_id", event.ID,
		)
		return err
	}

	if len(featureUsage) > 0 {
		if err := s.featureUsageRepo.BulkInsertProcessedEvents(ctx, featureUsage); err != nil {
			return err
		}
	}

	return nil
}

// Generate a unique hash for deduplication
// there are 2 cases:
// 1. event_name + event_id // for non COUNT_UNIQUE aggregation types
// 2. event_name + event_field_name + event_field_value // for COUNT_UNIQUE aggregation types
func (s *featureUsageTrackingService) generateUniqueHash(event *events.Event, meter *meter.Meter) string {
	hashStr := fmt.Sprintf("%s:%s", event.EventName, event.ID)

	// For meters with field-based aggregation, include the field value in the hash
	if meter.Aggregation.Type == types.AggregationCountUnique && meter.Aggregation.Field != "" {
		if fieldValue, ok := event.Properties[meter.Aggregation.Field]; ok {
			hashStr = fmt.Sprintf("%s:%s:%v", event.EventName, meter.Aggregation.Field, fieldValue)
		}
	}

	hash := sha256.Sum256([]byte(hashStr))
	return hex.EncodeToString(hash[:])
}

func (s *featureUsageTrackingService) prepareProcessedEvents(ctx context.Context, event *events.Event) ([]*events.FeatureUsage, error) {
	subscriptionService := NewSubscriptionService(s.ServiceParams)

	// Create a base processed event
	baseProcessedEvent := event.ToProcessedEvent()

	// Results slice - will contain either a single skipped event or multiple processed events
	results := make([]*events.FeatureUsage, 0)

	// CASE 1: Lookup customer
	customer, err := s.CustomerRepo.GetByLookupKey(ctx, event.ExternalCustomerID)
	if err != nil {
		s.Logger.Warnw("customer not found for event, skipping",
			"event_id", event.ID,
			"external_customer_id", event.ExternalCustomerID,
			"error", err,
		)
		// Simply skip the event if customer not found
		// TODO: add sentry span for customer not found
		return results, nil
	}

	// Set the customer ID in the event if it's not already set
	if event.CustomerID == "" {
		event.CustomerID = customer.ID
		baseProcessedEvent.CustomerID = customer.ID
	}

	// CASE 2: Get active subscriptions
	filter := types.NewSubscriptionFilter()
	filter.CustomerID = customer.ID
	filter.WithLineItems = true
	filter.Expand = lo.ToPtr(string(types.ExpandPrices) + "," + string(types.ExpandMeters) + "," + string(types.ExpandFeatures))
	filter.SubscriptionStatus = []types.SubscriptionStatus{
		types.SubscriptionStatusActive,
		types.SubscriptionStatusTrialing,
	}

	subscriptionsList, err := subscriptionService.ListSubscriptions(ctx, filter)
	if err != nil {
		s.Logger.Errorw("failed to get subscriptions",
			"event_id", event.ID,
			"customer_id", customer.ID,
			"error", err,
		)
		// TODO: add sentry span for failed to get subscriptions
		return results, err
	}

	subscriptions := subscriptionsList.Items
	if len(subscriptions) == 0 {
		s.Logger.Debugw("no active subscriptions found for customer, skipping",
			"event_id", event.ID,
			"customer_id", customer.ID,
		)
		// TODO: add sentry span for no active subscriptions found
		return results, nil
	}

	// Filter subscriptions to only include those that are active for the event timestamp
	validSubscriptions := make([]*dto.SubscriptionResponse, 0)
	for _, sub := range subscriptions {
		if s.isSubscriptionValidForEvent(sub, event) {
			validSubscriptions = append(validSubscriptions, sub)
		}
	}

	subscriptions = validSubscriptions
	if len(subscriptions) == 0 {
		s.Logger.Debugw("no subscriptions valid for event timestamp, skipping",
			"event_id", event.ID,
			"customer_id", customer.ID,
			"event_timestamp", event.Timestamp,
		)
		return results, nil
	}

	// Collect all price IDs and meter IDs from subscription line items
	priceIDs := make([]string, 0)
	meterIDs := make([]string, 0)
	subLineItemMap := make(map[string]*subscription.SubscriptionLineItem) // Map price_id -> line item

	// Extract price IDs and meter IDs from all subscription line items in a single pass
	for _, sub := range subscriptions {
		for _, item := range sub.LineItems {
			if !item.IsUsage() || !item.IsActive(event.Timestamp) {
				continue
			}

			subLineItemMap[item.PriceID] = item
			priceIDs = append(priceIDs, item.PriceID)
		}
	}

	// Remove duplicates
	priceIDs = lo.Uniq(priceIDs)

	// Fetch all prices in bulk
	priceFilter := types.NewNoLimitPriceFilter().
		WithPriceIDs(priceIDs).
		WithStatus(types.StatusPublished).
		WithExpand(string(types.ExpandMeters))

	prices, err := s.PriceRepo.List(ctx, priceFilter)
	if err != nil {
		s.Logger.Errorw("failed to get prices",
			"error", err,
			"event_id", event.ID,
			"price_count", len(priceIDs),
		)
		return results, err
	}

	// Build price map and collect meter IDs
	priceMap := make(map[string]*price.Price)
	for _, p := range prices {
		if !p.IsUsage() {
			continue
		}
		priceMap[p.ID] = p
		if p.MeterID != "" {
			meterIDs = append(meterIDs, p.MeterID)
		}
	}

	// Remove duplicate meter IDs
	meterIDs = lo.Uniq(meterIDs)

	// Fetch all meters in bulk
	meterFilter := types.NewNoLimitMeterFilter()
	meterFilter.MeterIDs = meterIDs

	meters, err := s.MeterRepo.List(ctx, meterFilter)
	if err != nil {
		s.Logger.Errorw("failed to get meters",
			"error", err,
			"event_id", event.ID,
			"meter_count", len(meterIDs),
		)
		return results, err
	}

	// Build meter map
	meterMap := make(map[string]*meter.Meter)
	for _, m := range meters {
		meterMap[m.ID] = m
	}

	// Build feature maps
	featureMap := make(map[string]*feature.Feature)      // Map feature_id -> feature
	featureMeterMap := make(map[string]*feature.Feature) // Map meter_id -> feature

	if len(meterMap) > 0 {
		featureFilter := types.NewNoLimitFeatureFilter()
		featureFilter.MeterIDs = lo.Keys(meterMap)
		features, err := s.FeatureRepo.List(ctx, featureFilter)
		if err != nil {
			s.Logger.Errorw("failed to get features",
				"error", err,
				"event_id", event.ID,
				"meter_count", len(meterMap),
			)
			// TODO: add sentry span for failed to get features
			return results, err
		}

		for _, f := range features {
			featureMap[f.ID] = f
			featureMeterMap[f.MeterID] = f
		}
	}

	// Process the event against each subscription
	featureUsagePerSub := make([]*events.FeatureUsage, 0)

	for _, sub := range subscriptions {
		// Calculate the period ID for this subscription (epoch-ms of period start)
		periodID, err := types.CalculatePeriodID(
			event.Timestamp,
			sub.StartDate,
			sub.CurrentPeriodStart,
			sub.CurrentPeriodEnd,
			sub.BillingAnchor,
			sub.BillingPeriodCount,
			sub.BillingPeriod,
		)
		if err != nil {
			s.Logger.Errorw("failed to calculate period id",
				"event_id", event.ID,
				"subscription_id", sub.ID,
				"error", err,
			)
			// TODO: add sentry span for failed to calculate period id
			continue
		}

		// Get active usage-based line items
		subscriptionLineItems := lo.Filter(sub.LineItems, func(item *subscription.SubscriptionLineItem, _ int) bool {
			return item.IsUsage() && item.IsActive(event.Timestamp)
		})

		if len(subscriptionLineItems) == 0 {
			s.Logger.Debugw("no active usage-based line items found for subscription",
				"event_id", event.ID,
				"subscription_id", sub.ID,
			)
			continue
		}

		// Collect relevant prices for matching
		prices := make([]*price.Price, 0, len(subscriptionLineItems))
		for _, item := range subscriptionLineItems {
			if price, ok := priceMap[item.PriceID]; ok {
				if event.Timestamp.Before(item.StartDate) || (!item.EndDate.IsZero() && event.Timestamp.After(item.EndDate)) {
					continue
				}
				prices = append(prices, price)
			} else {
				s.Logger.Warnw("price not found for subscription line item",
					"event_id", event.ID,
					"subscription_id", sub.ID,
					"line_item_id", item.ID,
					"price_id", item.PriceID,
				)
				// Skip this item but continue with others - don't fail the whole batch
				continue
			}
		}

		// Find meters and prices that match this event
		matches := s.findMatchingPricesForEvent(event, prices, meterMap)

		if len(matches) == 0 {
			s.Logger.Debugw("no matching prices/meters found for subscription",
				"event_id", event.ID,
				"subscription_id", sub.ID,
				"event_name", event.EventName,
			)
			continue
		}

		for _, match := range matches {
			// Find the corresponding line item
			lineItem, ok := subLineItemMap[match.Price.ID]
			if !ok {
				s.Logger.Warnw("line item not found for price",
					"event_id", event.ID,
					"subscription_id", sub.ID,
					"price_id", match.Price.ID,
				)
				continue
			}

			// Create a unique hash for deduplication
			uniqueHash := s.generateUniqueHash(event, match.Meter)

			// TODO: Check for duplicate events also maybe just call for COUNT_UNIQUE and not all cases

			// Create a new processed event for each match
			featureUsageCopy := &events.FeatureUsage{
				Event:          *event,
				SubscriptionID: sub.ID,
				SubLineItemID:  lineItem.ID,
				PriceID:        match.Price.ID,
				MeterID:        match.Meter.ID,
				PeriodID:       periodID,
				UniqueHash:     uniqueHash,
				Sign:           1, // Default to positive sign
			}

			// Set feature ID if available
			if feature, ok := featureMeterMap[match.Meter.ID]; ok {
				featureUsageCopy.FeatureID = feature.ID
			} else {
				s.Logger.Warnw("feature not found for meter",
					"event_id", event.ID,
					"meter_id", match.Meter.ID,
				)
				continue
			}

			// Extract quantity based on meter aggregation
			quantity, _ := s.extractQuantityFromEvent(event, match.Meter, sub.Subscription, periodID)

			// Validate the quantity is positive and within reasonable bounds
			if quantity.IsNegative() {
				s.Logger.Warnw("negative quantity calculated, setting to zero",
					"event_id", event.ID,
					"meter_id", match.Meter.ID,
					"calculated_quantity", quantity.String(),
				)
				quantity = decimal.Zero
			}

			// Store original quantity
			featureUsageCopy.QtyTotal = quantity

			featureUsagePerSub = append(featureUsagePerSub, featureUsageCopy)
		}
	}

	// Return all processed events
	if len(featureUsagePerSub) > 0 {
		s.Logger.Debugw("event processing request prepared",
			"event_id", event.ID,
			"feature_usage_count", len(featureUsagePerSub),
		)
		return featureUsagePerSub, nil
	}

	// If we got here, no events were processed
	return results, nil
}

// Find matching prices for an event based on meter configuration and filters
func (s *featureUsageTrackingService) findMatchingPricesForEvent(
	event *events.Event,
	prices []*price.Price,
	meterMap map[string]*meter.Meter,
) []PriceMatch {
	matches := make([]PriceMatch, 0)

	// Find prices with associated meters
	for _, price := range prices {
		if !price.IsUsage() {
			continue
		}

		meter, ok := meterMap[price.MeterID]
		if !ok || meter == nil {
			s.Logger.Warnw("feature usage tracking: meter not found for price",
				"event_id", event.ID,
				"price_id", price.ID,
				"meter_id", price.MeterID,
			)
			continue
		}

		// Skip if meter doesn't match the event name
		if meter.EventName != event.EventName {
			continue
		}

		// Check meter filters
		if !s.checkMeterFilters(event, meter.Filters) {
			continue
		}

		// Add to matches
		matches = append(matches, PriceMatch{
			Price: price,
			Meter: meter,
		})
	}

	// Sort matches by filter specificity (most specific first)
	sort.Slice(matches, func(i, j int) bool {
		// Calculate priority based on filter count
		priorityI := len(matches[i].Meter.Filters)
		priorityJ := len(matches[j].Meter.Filters)

		if priorityI != priorityJ {
			return priorityI > priorityJ
		}

		// Tie-break using price ID for deterministic ordering
		return matches[i].Price.ID < matches[j].Price.ID
	})

	return matches
}

// Check if an event matches the meter filters
func (s *featureUsageTrackingService) checkMeterFilters(event *events.Event, filters []meter.Filter) bool {
	if len(filters) == 0 {
		return true // No filters means everything matches
	}

	for _, filter := range filters {
		propertyValue, exists := event.Properties[filter.Key]
		if !exists {
			return false
		}

		// Convert property value to string for comparison
		propStr := fmt.Sprintf("%v", propertyValue)

		// Check if the value is in the filter values
		if !lo.Contains(filter.Values, propStr) {
			return false
		}
	}

	return true
}

// Extract quantity from event based on meter aggregation
// Returns the quantity and the string representation of the field value
func (s *featureUsageTrackingService) extractQuantityFromEvent(
	event *events.Event,
	meter *meter.Meter,
	subscription *subscription.Subscription,
	periodID uint64,
) (decimal.Decimal, string) {
	switch meter.Aggregation.Type {
	case types.AggregationCount:
		// For count, always return 1 and empty string for field value
		return decimal.NewFromInt(1), ""

	case types.AggregationSum, types.AggregationAvg, types.AggregationLatest, types.AggregationMax:
		if meter.Aggregation.Field == "" {
			s.Logger.Warnw("aggregation with empty field name",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"aggregation_type", meter.Aggregation.Type,
			)
			return decimal.Zero, ""
		}

		val, ok := event.Properties[meter.Aggregation.Field]
		if !ok {
			s.Logger.Warnw("property not found for aggregation",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"field", meter.Aggregation.Field,
				"aggregation_type", meter.Aggregation.Type,
			)
			return decimal.Zero, ""
		}

		// Convert value to decimal and string with detailed error handling
		decimalValue, stringValue := s.convertValueToDecimal(val, event, meter)
		return decimalValue, stringValue

	case types.AggregationSumWithMultiplier:
		if meter.Aggregation.Field == "" {
			s.Logger.Warnw("sum_with_multiplier aggregation with empty field name",
				"event_id", event.ID,
				"meter_id", meter.ID,
			)
			return decimal.Zero, ""
		}

		if meter.Aggregation.Multiplier == nil {
			s.Logger.Warnw("sum_with_multiplier aggregation without multiplier",
				"event_id", event.ID,
				"meter_id", meter.ID,
			)
			return decimal.Zero, ""
		}

		val, ok := event.Properties[meter.Aggregation.Field]
		if !ok {
			s.Logger.Warnw("property not found for sum_with_multiplier aggregation",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"field", meter.Aggregation.Field,
			)
			return decimal.Zero, ""
		}

		// Convert value to decimal and apply multiplier
		decimalValue, stringValue := s.convertValueToDecimal(val, event, meter)
		if decimalValue.IsZero() {
			return decimal.Zero, stringValue
		}

		// Apply multiplier
		result := decimalValue.Mul(*meter.Aggregation.Multiplier)
		return result, stringValue

	case types.AggregationCountUnique:
		if meter.Aggregation.Field == "" {
			s.Logger.Warnw("count_unique aggregation with empty field name",
				"event_id", event.ID,
				"meter_id", meter.ID,
			)
			return decimal.Zero, ""
		}

		val, ok := event.Properties[meter.Aggregation.Field]
		if !ok {
			s.Logger.Warnw("property not found for count_unique aggregation",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"field", meter.Aggregation.Field,
			)
			return decimal.Zero, ""
		}

		// For count_unique, we return 1 if the value exists (uniqueness is handled at aggregation level)
		// and convert the value to string for tracking
		stringValue := s.convertValueToString(val)
		return decimal.NewFromInt(1), stringValue
	case types.AggregationWeightedSum:
		if meter.Aggregation.Field == "" {
			s.Logger.Warnw("weighted_sum aggregation with empty field name",
				"event_id", event.ID,
				"meter_id", meter.ID,
			)
			return decimal.Zero, ""
		}

		val, ok := event.Properties[meter.Aggregation.Field]
		if !ok {
			s.Logger.Warnw("property not found for weighted_sum aggregation",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"field", meter.Aggregation.Field,
			)
			return decimal.Zero, ""
		}

		// Convert value to decimal and apply multiplier
		decimalValue, stringValue := s.convertValueToDecimal(val, event, meter)
		if decimalValue.IsZero() {
			return decimal.Zero, stringValue
		}

		// Apply multiplier
		result, err := s.getTotalUsageForWeightedSumAggregation(subscription, event, decimalValue, periodID)
		if err != nil {
			return decimal.Zero, stringValue
		}
		return result, stringValue
	default:
		s.Logger.Warnw("unsupported aggregation type",
			"event_id", event.ID,
			"meter_id", meter.ID,
			"aggregation_type", meter.Aggregation.Type,
		)
		return decimal.Zero, ""
	}
}

// convertValueToDecimal converts a property value to decimal and string representation
func (s *featureUsageTrackingService) convertValueToDecimal(val interface{}, event *events.Event, meter *meter.Meter) (decimal.Decimal, string) {
	var decimalValue decimal.Decimal
	var stringValue string

	switch v := val.(type) {
	case float64:
		decimalValue = decimal.NewFromFloat(v)
		stringValue = fmt.Sprintf("%f", v)

	case float32:
		decimalValue = decimal.NewFromFloat32(v)
		stringValue = fmt.Sprintf("%f", v)

	case int:
		decimalValue = decimal.NewFromInt(int64(v))
		stringValue = fmt.Sprintf("%d", v)

	case int64:
		decimalValue = decimal.NewFromInt(v)
		stringValue = fmt.Sprintf("%d", v)

	case int32:
		decimalValue = decimal.NewFromInt(int64(v))
		stringValue = fmt.Sprintf("%d", v)

	case uint:
		// Convert uint to int64 safely
		decimalValue = decimal.NewFromInt(int64(v))
		stringValue = fmt.Sprintf("%d", v)

	case uint64:
		// Convert uint64 to string then parse to ensure no overflow
		str := fmt.Sprintf("%d", v)
		var err error
		decimalValue, err = decimal.NewFromString(str)
		if err != nil {
			s.Logger.Warnw("failed to parse uint64 as decimal",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"value", v,
				"error", err,
			)
			return decimal.Zero, str
		}
		stringValue = str

	case string:
		var err error
		decimalValue, err = decimal.NewFromString(v)
		if err != nil {
			s.Logger.Warnw("failed to parse string as decimal",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"value", v,
				"error", err,
			)
			return decimal.Zero, v
		}
		stringValue = v

	case json.Number:
		var err error
		decimalValue, err = decimal.NewFromString(string(v))
		if err != nil {
			s.Logger.Warnw("failed to parse json.Number as decimal",
				"event_id", event.ID,
				"meter_id", meter.ID,
				"value", v,
				"error", err,
			)
			return decimal.Zero, string(v)
		}
		stringValue = string(v)

	default:
		// Try to convert to string representation
		stringValue = fmt.Sprintf("%v", v)
		s.Logger.Warnw("unknown type for aggregation - cannot convert to decimal",
			"event_id", event.ID,
			"meter_id", meter.ID,
			"field", meter.Aggregation.Field,
			"aggregation_type", meter.Aggregation.Type,
			"type", fmt.Sprintf("%T", v),
			"value", stringValue,
		)
		return decimal.Zero, stringValue
	}

	return decimalValue, stringValue
}

// convertValueToString converts a property value to string representation
func (s *featureUsageTrackingService) convertValueToString(val interface{}) string {
	switch v := val.(type) {
	case string:
		return v
	case float64, float32, int, int64, int32, uint, uint64:
		return fmt.Sprintf("%v", v)
	case json.Number:
		return string(v)
	default:
		return fmt.Sprintf("%v", v)
	}
}

// GetDetailedUsageAnalytics provides detailed usage analytics with filtering, grouping, and time-series data
func (s *featureUsageTrackingService) GetDetailedUsageAnalytics(ctx context.Context, req *dto.GetUsageAnalyticsRequest) (*dto.GetUsageAnalyticsResponse, error) {
	// Validate the request
	if req.ExternalCustomerID == "" {
		return nil, ierr.NewError("external_customer_id is required").
			WithHint("External customer ID is required").
			Mark(ierr.ErrValidation)
	}

	// Set default window size if needed
	windowSize := req.WindowSize
	if windowSize != "" {
		if err := windowSize.Validate(); err != nil {
			return nil, err
		}
	}

	// Step 1: Resolve customer
	customer, err := s.CustomerRepo.GetByLookupKey(ctx, req.ExternalCustomerID)
	if err != nil {
		return nil, ierr.WithError(err).
			WithHint("Customer not found").
			WithReportableDetails(map[string]interface{}{
				"external_customer_id": req.ExternalCustomerID,
			}).
			Mark(ierr.ErrNotFound)
	}

	// Step 2: Get active subscriptions for the customer with line items (single call)
	subscriptionService := NewSubscriptionService(s.ServiceParams)
	filter := types.NewSubscriptionFilter()
	filter.CustomerID = customer.ID
	filter.WithLineItems = true // Get line items in single call
	filter.SubscriptionStatus = []types.SubscriptionStatus{
		types.SubscriptionStatusActive,
		types.SubscriptionStatusTrialing,
	}

	subscriptionsList, err := subscriptionService.ListSubscriptions(ctx, filter)
	if err != nil {
		s.Logger.Errorw("failed to get subscriptions for analytics",
			"error", err,
			"customer_id", customer.ID,
		)
		return nil, err
	}

	// Step 3: Validate currency and extract subscription data
	finalCurrency := ""
	subscriptions := subscriptionsList.Items
	if len(subscriptions) > 0 {
		currency := subscriptions[0].Currency
		for _, sub := range subscriptions {
			if sub.Currency != currency {
				return nil, ierr.NewError("multiple currencies detected").
					WithHint("Analytics is only supported for customers with a single currency across all subscriptions").
					WithReportableDetails(map[string]interface{}{
						"customer_id": customer.ID,
					}).
					Mark(ierr.ErrValidation)
			}
		}
		finalCurrency = currency
	}

	// Step 4: Create the parameters object for repository call
	params := &events.UsageAnalyticsParams{
		TenantID:           types.GetTenantID(ctx),
		EnvironmentID:      types.GetEnvironmentID(ctx),
		CustomerID:         customer.ID,
		ExternalCustomerID: req.ExternalCustomerID,
		FeatureIDs:         req.FeatureIDs,
		Sources:            req.Sources,
		StartTime:          req.StartTime,
		EndTime:            req.EndTime,
		GroupBy:            req.GroupBy,
		WindowSize:         windowSize,
		PropertyFilters:    req.PropertyFilters,
	}

	// Step 5: Identify MAX bucket features using subscription data (no additional call)
	// Convert SubscriptionResponse to subscription.Subscription
	subscriptionDomains := make([]*subscription.Subscription, len(subscriptions))
	for i, subResp := range subscriptions {
		subscriptionDomains[i] = subResp.Subscription
	}

	maxBucketFeatures, err := s.identifyMaxBucketFeaturesFromSubscriptions(ctx, params, subscriptionDomains)
	if err != nil {
		s.Logger.Errorw("failed to identify MAX bucket features",
			"error", err,
			"external_customer_id", req.ExternalCustomerID,
		)
		return nil, err
	}

	// Step 6: Call the repository to get analytics data with MAX bucket handling
	analytics, err := s.featureUsageRepo.GetDetailedUsageAnalytics(ctx, params, maxBucketFeatures)
	if err != nil {
		s.Logger.Errorw("failed to get detailed usage analytics",
			"error", err,
			"external_customer_id", req.ExternalCustomerID,
		)
		return nil, err
	}

	// Step 7: If no results, return early
	if len(analytics) == 0 {
		return s.ToGetUsageAnalyticsResponseDTO(ctx, analytics)
	}

	// Step 8: Enrich with feature, meter, and subscription-specific price data
	err = s.enrichAnalyticsWithFeatureMeterAndPriceData(ctx, analytics, subscriptionDomains)
	if err != nil {
		s.Logger.Warnw("failed to fully enrich analytics with feature and meter data",
			"error", err,
			"analytics_count", len(analytics),
		)
		// Continue with partial data rather than failing completely
	}

	// Step 9: Set currency on all analytics items if we have subscription data
	if len(subscriptions) > 0 && finalCurrency != "" {
		for _, item := range analytics {
			item.Currency = finalCurrency
		}
	}

	return s.ToGetUsageAnalyticsResponseDTO(ctx, analytics)
}

// identifyMaxBucketFeaturesFromSubscriptions identifies MAX bucket features from customer's active subscriptions
func (s *featureUsageTrackingService) identifyMaxBucketFeaturesFromSubscriptions(ctx context.Context, params *events.UsageAnalyticsParams, subscriptions []*subscription.Subscription) (map[string]*events.MaxBucketFeatureInfo, error) {
	// If subscriptions not provided, fetch them
	if subscriptions == nil {
		subscriptionService := NewSubscriptionService(s.ServiceParams)
		filter := types.NewSubscriptionFilter()
		filter.CustomerID = params.CustomerID
		filter.WithLineItems = true // We need line items to get feature information
		filter.SubscriptionStatus = []types.SubscriptionStatus{
			types.SubscriptionStatusActive,
			types.SubscriptionStatusTrialing,
		}

		subscriptionsList, err := subscriptionService.ListSubscriptions(ctx, filter)
		if err != nil {
			return nil, ierr.WithError(err).
				WithHint("Failed to get subscriptions for MAX bucket feature identification").
				Mark(ierr.ErrDatabase)
		}
		// Convert SubscriptionResponse to subscription.Subscription
		subscriptionResponses := subscriptionsList.Items
		subscriptions = make([]*subscription.Subscription, len(subscriptionResponses))
		for i, subResp := range subscriptionResponses {
			subscriptions[i] = subResp.Subscription
		}
	}

	// Extract all meter IDs from subscription line items
	meterIDSet := make(map[string]bool)
	for _, sub := range subscriptions {
		for _, lineItem := range sub.LineItems {
			if lineItem.IsUsage() && lineItem.MeterID != "" {
				meterIDSet[lineItem.MeterID] = true
			}
		}
	}

	// Convert to slice
	meterIDs := make([]string, 0, len(meterIDSet))
	for meterID := range meterIDSet {
		meterIDs = append(meterIDs, meterID)
	}

	if len(meterIDs) == 0 {
		return make(map[string]*events.MaxBucketFeatureInfo), nil
	}

	// Get features by meter IDs with meter expansion for efficiency
	featureService := NewFeatureService(s.ServiceParams)
	featureFilter := types.NewNoLimitFeatureFilter()
	featureFilter.MeterIDs = meterIDs
	featureFilter.Expand = lo.ToPtr(string(types.ExpandMeters)) // Expand meters in single call

	featuresResponse, err := featureService.GetFeatures(ctx, featureFilter)
	if err != nil {
		return nil, ierr.WithError(err).
			WithHint("Failed to fetch features with meters for MAX bucket identification").
			Mark(ierr.ErrDatabase)
	}

	// Identify MAX with bucket features directly
	maxBucketFeatures := make(map[string]*events.MaxBucketFeatureInfo)
	for _, featureResponse := range featuresResponse.Items {
		feature := featureResponse.Feature
		// Check if feature has meter and meter is expanded
		if feature.Type == types.FeatureTypeMetered && feature.MeterID != "" && featureResponse.Meter != nil {
			meter := featureResponse.Meter.ToMeter()
			// Check if this is a MAX aggregation with bucket size
			if meter.Aggregation.Type == types.AggregationMax && meter.Aggregation.BucketSize != "" {
				maxBucketFeatures[feature.ID] = &events.MaxBucketFeatureInfo{
					FeatureID:    feature.ID,
					MeterID:      meter.ID,
					BucketSize:   meter.Aggregation.BucketSize,
					EventName:    meter.EventName,
					PropertyName: meter.Aggregation.Field,
				}
			}
		}
	}

	return maxBucketFeatures, nil
}

// enrichAnalyticsWithFeatureMeterAndPriceData fetches and adds feature, meter, and subscription-specific price data to analytics results
func (s *featureUsageTrackingService) enrichAnalyticsWithFeatureMeterAndPriceData(ctx context.Context, analytics []*events.DetailedUsageAnalytic, subscriptions []*subscription.Subscription) error {
	// Extract all unique feature IDs
	featureIDs := make([]string, 0)
	featureIDSet := make(map[string]bool)

	for _, item := range analytics {
		if item.FeatureID != "" && !featureIDSet[item.FeatureID] {
			featureIDs = append(featureIDs, item.FeatureID)
			featureIDSet[item.FeatureID] = true
		}
	}

	// If no feature IDs, nothing to enrich
	if len(featureIDs) == 0 {
		return nil
	}

	// Fetch all features in one call
	featureFilter := types.NewNoLimitFeatureFilter()
	featureFilter.FeatureIDs = featureIDs
	features, err := s.FeatureRepo.List(ctx, featureFilter)
	if err != nil {
		return ierr.WithError(err).
			WithHint("Failed to fetch features for enrichment").
			Mark(ierr.ErrDatabase)
	}

	// Extract all meter IDs from features
	meterIDs := make([]string, 0)
	meterIDSet := make(map[string]bool)
	featureMap := make(map[string]*feature.Feature)

	for _, f := range features {
		featureMap[f.ID] = f
		if f.MeterID != "" && !meterIDSet[f.MeterID] {
			meterIDs = append(meterIDs, f.MeterID)
			meterIDSet[f.MeterID] = true
		}
	}

	// Fetch all meters in one call
	meterFilter := types.NewNoLimitMeterFilter()
	meterFilter.MeterIDs = meterIDs
	meters, err := s.MeterRepo.List(ctx, meterFilter)
	if err != nil {
		return ierr.WithError(err).
			WithHint("Failed to fetch meters for enrichment").
			Mark(ierr.ErrDatabase)
	}

	// Create meter map for quick lookup
	meterMap := make(map[string]*meter.Meter)
	for _, m := range meters {
		meterMap[m.ID] = m
	}

	// Build subscription-specific price mapping from line items
	meterToPriceMap := make(map[string]*price.Price) // meter_id -> price
	priceService := NewPriceService(s.ServiceParams)

	// Collect all price IDs from subscription line items
	priceIDs := make([]string, 0)
	priceIDSet := make(map[string]bool)

	for _, sub := range subscriptions {
		for _, lineItem := range sub.LineItems {
			if lineItem.IsUsage() && lineItem.MeterID != "" && lineItem.PriceID != "" {
				if !priceIDSet[lineItem.PriceID] {
					priceIDs = append(priceIDs, lineItem.PriceID)
					priceIDSet[lineItem.PriceID] = true
				}
			}
		}
	}

	// Fetch subscription-specific prices
	if len(priceIDs) > 0 {
		priceFilter := types.NewNoLimitPriceFilter()
		priceFilter.PriceIDs = priceIDs
		priceFilter.WithStatus(types.StatusPublished)
		priceFilter.AllowExpiredPrices = true // Include expired prices for historical data
		prices, err := s.PriceRepo.List(ctx, priceFilter)
		if err != nil {
			return ierr.WithError(err).
				WithHint("Failed to fetch subscription prices for cost calculation").
				Mark(ierr.ErrDatabase)
		}

		// Create price map for quick lookup by meter ID
		for _, p := range prices {
			if p.MeterID != "" {
				meterToPriceMap[p.MeterID] = p
			}
		}
	}

	// Enrich analytics with feature, meter, and subscription-specific price data
	for _, item := range analytics {
		if feature, ok := featureMap[item.FeatureID]; ok {
			item.FeatureName = feature.Name
			item.Unit = feature.UnitSingular
			item.UnitPlural = feature.UnitPlural

			if meter, ok := meterMap[feature.MeterID]; ok {
				item.MeterID = meter.ID
				item.EventName = meter.EventName
				item.AggregationType = meter.Aggregation.Type

				// Set the correct TotalUsage based on meter's aggregation type

				// Calculate cost using subscription-specific price
				if price, hasPricing := meterToPriceMap[meter.ID]; hasPricing {
					// Check if this is a bucketed max meter
					if meter.IsBucketedMaxMeter() {
						// For bucketed features, extract values from points and use CalculateBucketedCost
						var cost decimal.Decimal

						if len(item.Points) > 0 {
							// When window size is provided, use points as buckets
							bucketedValues := make([]decimal.Decimal, len(item.Points))
							for i, point := range item.Points {
								bucketedValues[i] = point.Usage
							}

							// Calculate total cost using bucketed values
							cost = priceService.CalculateBucketedCost(ctx, price, bucketedValues)

							// Calculate cost for each point individually (each point represents a bucket)
							for i := range item.Points {
								pointUsage := item.Points[i].Usage
								pointCost := priceService.CalculateCost(ctx, price, pointUsage)
								item.Points[i].Cost = pointCost
							}
						} else {
							// When no window size is provided, treat TotalUsage as a single bucket
							// This is a fallback for bucketedMax meters when no time windowing is requested
							totalUsage := item.TotalUsage
							if totalUsage.IsPositive() {
								// Treat the total usage as a single bucket for cost calculation
								bucketedValues := []decimal.Decimal{totalUsage}
								cost = priceService.CalculateBucketedCost(ctx, price, bucketedValues)
							} else {
								cost = decimal.Zero
							}
						}

						item.TotalCost = cost
						item.Currency = price.Currency
					} else {
						// For non-bucketed features, use regular CalculateCost
						item.TotalUsage = s.getCorrectUsageValue(item, meter.Aggregation.Type)

						cost := priceService.CalculateCost(ctx, price, item.TotalUsage)
						item.TotalCost = cost
						item.Currency = price.Currency

						// Also calculate cost for each point in the time series if points exist
						for i := range item.Points {
							pointUsage := s.getCorrectUsageValueForPoint(item.Points[i], meter.Aggregation.Type)
							pointCost := priceService.CalculateCost(ctx, price, pointUsage)
							item.Points[i].Cost = pointCost
						}
					}
				}
			}
		}
	}

	return nil
}

// getCorrectUsageValue returns the correct usage value based on the meter's aggregation type
func (s *featureUsageTrackingService) getCorrectUsageValue(item *events.DetailedUsageAnalytic, aggregationType types.AggregationType) decimal.Decimal {
	switch aggregationType {
	case types.AggregationCountUnique:
		return decimal.NewFromInt(int64(item.CountUniqueUsage))
	case types.AggregationMax:
		return item.MaxUsage
	case types.AggregationLatest:
		return item.LatestUsage
	case types.AggregationSum, types.AggregationSumWithMultiplier, types.AggregationAvg, types.AggregationWeightedSum:
		return item.TotalUsage
	default:
		// Default to SUM for unknown types
		return item.TotalUsage
	}
}

// getCorrectUsageValueForPoint returns the correct usage value for a time series point based on aggregation type
func (s *featureUsageTrackingService) getCorrectUsageValueForPoint(point events.UsageAnalyticPoint, aggregationType types.AggregationType) decimal.Decimal {
	switch aggregationType {
	case types.AggregationCountUnique:
		return decimal.NewFromInt(int64(point.CountUniqueUsage))
	case types.AggregationMax:
		return point.MaxUsage
	case types.AggregationLatest:
		return point.LatestUsage
	case types.AggregationSum, types.AggregationSumWithMultiplier, types.AggregationAvg, types.AggregationWeightedSum:
		return point.Usage
	default:
		// Default to SUM for unknown types
		return point.Usage
	}
}

// ReprocessEvents triggers reprocessing of events for a customer or with other filters
func (s *featureUsageTrackingService) ReprocessEvents(ctx context.Context, params *events.ReprocessEventsParams) error {
	s.Logger.Infow("starting event reprocessing for feature usage tracking",
		"external_customer_id", params.ExternalCustomerID,
		"event_name", params.EventName,
		"start_time", params.StartTime,
		"end_time", params.EndTime,
	)

	// Set default batch size if not provided
	batchSize := params.BatchSize
	if batchSize <= 0 {
		batchSize = 100
	}

	// Create find params from reprocess params
	findParams := &events.FindUnprocessedEventsParams{
		ExternalCustomerID: params.ExternalCustomerID,
		EventName:          params.EventName,
		StartTime:          params.StartTime,
		EndTime:            params.EndTime,
		BatchSize:          batchSize,
	}

	// We'll process in batches to avoid memory issues with large datasets
	processedBatches := 0
	totalEventsFound := 0
	totalEventsPublished := 0
	var lastID string
	var lastTimestamp time.Time

	// Keep processing batches until we're done
	for {
		// Update keyset pagination parameters for next batch
		if lastID != "" && !lastTimestamp.IsZero() {
			findParams.LastID = lastID
			findParams.LastTimestamp = lastTimestamp
		}

		// Find unprocessed events
		unprocessedEvents, err := s.eventRepo.FindUnprocessedEvents(ctx, findParams)
		if err != nil {
			return ierr.WithError(err).
				WithHint("Failed to find unprocessed events").
				WithReportableDetails(map[string]interface{}{
					"external_customer_id": params.ExternalCustomerID,
					"event_name":           params.EventName,
					"batch":                processedBatches,
				}).
				Mark(ierr.ErrDatabase)
		}

		eventsCount := len(unprocessedEvents)
		totalEventsFound += eventsCount
		s.Logger.Infow("found unprocessed events",
			"batch", processedBatches,
			"count", eventsCount,
			"total_found", totalEventsFound,
		)

		// If no more events, we're done
		if eventsCount == 0 {
			break
		}

		// Publish each event to the feature usage tracking topic
		for _, event := range unprocessedEvents {
			// hardcoded delay to avoid rate limiting
			// TODO: remove this to make it configurable
			if err := s.PublishEvent(ctx, event, true); err != nil {
				s.Logger.Errorw("failed to publish event for reprocessing for feature usage tracking",
					"event_id", event.ID,
					"error", err,
				)
				// Continue with other events instead of failing the whole batch
				continue
			}
			totalEventsPublished++

			// Update the last seen ID and timestamp for next batch
			lastID = event.ID
			lastTimestamp = event.Timestamp
		}

		s.Logger.Infow("published events for reprocessing for feature usage tracking",
			"batch", processedBatches,
			"count", eventsCount,
			"total_published", totalEventsPublished,
		)

		// Update for next batch
		processedBatches++

		// If we didn't get a full batch, we're done
		if eventsCount < batchSize {
			break
		}
	}

	s.Logger.Infow("completed event reprocessing for feature usage tracking",
		"external_customer_id", params.ExternalCustomerID,
		"event_name", params.EventName,
		"batches_processed", processedBatches,
		"total_events_found", totalEventsFound,
		"total_events_published", totalEventsPublished,
	)

	return nil
}

// isSubscriptionValidForEvent checks if a subscription is valid for processing the given event
// It ensures the event timestamp falls within the subscription's active period
func (s *featureUsageTrackingService) isSubscriptionValidForEvent(
	sub *dto.SubscriptionResponse,
	event *events.Event,
) bool {
	// Event must be after subscription start date
	if event.Timestamp.Before(sub.StartDate) {
		s.Logger.Debugw("event timestamp before subscription start date",
			"event_id", event.ID,
			"subscription_id", sub.ID,
			"event_timestamp", event.Timestamp,
			"subscription_start_date", sub.StartDate,
		)
		return false
	}

	// If subscription has an end date, event must be before or equal to it
	if sub.EndDate != nil && event.Timestamp.After(*sub.EndDate) {
		s.Logger.Debugw("event timestamp after subscription end date",
			"event_id", event.ID,
			"subscription_id", sub.ID,
			"event_timestamp", event.Timestamp,
			"subscription_end_date", *sub.EndDate,
		)
		return false
	}

	// Additional check: if subscription is cancelled, make sure event is before cancellation
	if sub.SubscriptionStatus == types.SubscriptionStatusCancelled && sub.CancelledAt != nil {
		if event.Timestamp.After(*sub.CancelledAt) {
			s.Logger.Debugw("event timestamp after subscription cancellation date",
				"event_id", event.ID,
				"subscription_id", sub.ID,
				"event_timestamp", event.Timestamp,
				"subscription_cancelled_at", *sub.CancelledAt,
			)
			return false
		}
	}

	return true
}

func (s *featureUsageTrackingService) ToGetUsageAnalyticsResponseDTO(ctx context.Context, analytics []*events.DetailedUsageAnalytic) (*dto.GetUsageAnalyticsResponse, error) {
	response := &dto.GetUsageAnalyticsResponse{
		TotalCost: decimal.Zero,
		Currency:  "",
		Items:     make([]dto.UsageAnalyticItem, 0, len(analytics)),
	}

	// Convert analytics to response items
	for _, analytic := range analytics {
		item := dto.UsageAnalyticItem{
			FeatureID:       analytic.FeatureID,
			FeatureName:     analytic.FeatureName,
			EventName:       analytic.EventName,
			Source:          analytic.Source,
			Unit:            analytic.Unit,
			UnitPlural:      analytic.UnitPlural,
			AggregationType: analytic.AggregationType,
			TotalUsage:      analytic.TotalUsage, // This is now set correctly in enrichment
			TotalCost:       analytic.TotalCost,
			Currency:        analytic.Currency,
			EventCount:      analytic.EventCount,
			Properties:      analytic.Properties,
			Points:          make([]dto.UsageAnalyticPoint, 0, len(analytic.Points)),
		}

		// Map time-series points if available
		for _, point := range analytic.Points {
			// Use the correct usage value based on aggregation type
			correctUsage := s.getCorrectUsageValueForPoint(point, analytic.AggregationType)
			item.Points = append(item.Points, dto.UsageAnalyticPoint{
				Timestamp:  point.Timestamp,
				Usage:      correctUsage,
				Cost:       point.Cost,
				EventCount: point.EventCount,
			})
		}

		response.Items = append(response.Items, item)
		response.TotalCost = response.TotalCost.Add(analytic.TotalCost)
		response.Currency = analytic.Currency
	}

	// sort by feature name
	sort.Slice(response.Items, func(i, j int) bool {
		return response.Items[i].FeatureName < response.Items[j].FeatureName
	})

	return response, nil
}

func (s *featureUsageTrackingService) getTotalUsageForWeightedSumAggregation(
	subscription *subscription.Subscription,
	event *events.Event,
	propertyValue decimal.Decimal,
	periodID uint64,
) (decimal.Decimal, error) {
	// Convert periodID (epoch milliseconds) back to time for the period start
	periodStart := time.UnixMilli(int64(periodID))

	// Calculate the period end using the subscription's billing configuration
	periodEnd, err := types.NextBillingDate(periodStart, subscription.BillingAnchor, subscription.BillingPeriodCount, subscription.BillingPeriod, nil)
	if err != nil {
		return decimal.Zero, ierr.WithError(err).
			WithHint("Failed to calculate period end for weighted sum aggregation").
			WithReportableDetails(map[string]interface{}{
				"subscription_id": subscription.ID,
				"period_id":       periodID,
				"period_start":    periodStart,
			}).
			Mark(ierr.ErrValidation)
	}

	// Calculate total billing period duration in seconds
	totalPeriodSeconds := periodEnd.Sub(periodStart).Seconds()
	if totalPeriodSeconds <= 0 {
		return decimal.Zero, ierr.NewError("invalid billing period duration").
			WithHint("Billing period duration must be positive").
			WithReportableDetails(map[string]interface{}{
				"subscription_id": subscription.ID,
				"period_id":       periodID,
				"period_start":    periodStart,
				"period_end":      periodEnd,
				"total_seconds":   totalPeriodSeconds,
			}).
			Mark(ierr.ErrValidation)
	}

	// Calculate remaining seconds from event timestamp to period end
	remainingSeconds := math.Max(0, periodEnd.Sub(event.Timestamp).Seconds())

	// Apply weighted sum formula: (value / billing_period_seconds) * remaining_seconds
	// This gives us the proportion of the value that should be counted for the remaining period
	weightedUsage := propertyValue.Div(decimal.NewFromFloat(totalPeriodSeconds)).Mul(decimal.NewFromFloat(remainingSeconds))

	return weightedUsage, nil
}
